Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add CDC sync checkpointing based on time or records #21727

Merged
merged 42 commits into from
Mar 13, 2023

Conversation

sergio-ropero
Copy link
Contributor

@sergio-ropero sergio-ropero commented Jan 23, 2023

This PR adds new functionality that generates checkpoints when doing CDC synchronisation: #21009

To doing so, instead of creating a new iterator on top of the current one, we are just changing the way we process the events.
We iterate over ChangeEvents and we figure out:

  • The record is not part of the snapshot
  • The lsn of the record is higher than the one read by Debezium
  • There has been sent +10000 records or has past more than 15 minutes since last state message (values that can change)

If all the previous conditions are met, a new state message is going to be sent in the next iteration. If it's also the end of the iterator, the state message is sent only once.

⚠️⚠️⚠️⚠️⚠️⚠️
It's possible that there will NOT be a checkpoint save every X records/seconds as expected.
The process needs to validate another conditions before setting sending the STATE message (like is not snapshot load, or the record we are publishing is actually after the offset we are going to send).
⚠️⚠️⚠️⚠️⚠️⚠️

…ing CDC synchronization.

For that purpose we encapsulate an AirbyteMessage Iterator on a new iterator that handles the
checkpoint messaging.
@github-actions
Copy link
Contributor

github-actions bot commented Jan 23, 2023

Affected Connector Report

NOTE ⚠️ Changes in this PR affect the following connectors. Make sure to do the following as needed:

  • Run integration tests
  • Bump connector or module version
  • Add changelog
  • Publish the new version

✅ Sources (8)

Connector Version Changelog Publish
source-alloydb 2.0.2
source-alloydb-strict-encrypt 2.0.2 🔵
(ignored)
🔵
(ignored)
source-mssql 1.0.0
source-mssql-strict-encrypt 1.0.0 🔵
(ignored)
🔵
(ignored)
source-mysql 2.0.0
source-mysql-strict-encrypt 2.0.0 🔵
(ignored)
🔵
(ignored)
source-postgres 2.0.2
source-postgres-strict-encrypt 2.0.2 🔵
(ignored)
🔵
(ignored)
  • See "Actionable Items" below for how to resolve warnings and errors.

✅ Destinations (0)

Connector Version Changelog Publish
  • See "Actionable Items" below for how to resolve warnings and errors.

✅ Other Modules (0)

Actionable Items

(click to expand)

Category Status Actionable Item
Version
mismatch
The version of the connector is different from its normal variant. Please bump the version of the connector.

doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.
Changelog
doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.

changelog missing
There is no chnagelog for the current version of the connector. If you are the author of the current version, please add a changelog.
Publish
not in seed
The connector is not in the seed file (e.g. source_definitions.yaml), so its publication status cannot be checked. This can be normal (e.g. some connectors are cloud-specific, and only listed in the cloud seed file). Please double-check to make sure that it is not a bug.

diff seed version
The connector exists in the seed file, but the latest version is not listed there. This usually means that the latest version is not published. Please use the /publish command to publish the latest version.

@sergio-ropero sergio-ropero temporarily deployed to more-secrets January 27, 2023 14:43 — with GitHub Actions Inactive
@sergio-ropero sergio-ropero temporarily deployed to more-secrets January 27, 2023 14:43 — with GitHub Actions Inactive
@sergio-ropero sergio-ropero temporarily deployed to more-secrets January 27, 2023 16:10 — with GitHub Actions Inactive
@sergio-ropero sergio-ropero temporarily deployed to more-secrets January 27, 2023 16:11 — with GitHub Actions Inactive
@github-actions
Copy link
Contributor

github-actions bot commented Jan 27, 2023

Airbyte Code Coverage

There is no coverage information present for the Files changed

Total Project Coverage 24.66%

@sergio-ropero sergio-ropero temporarily deployed to more-secrets January 30, 2023 16:49 — with GitHub Actions Inactive
@sergio-ropero sergio-ropero temporarily deployed to more-secrets January 30, 2023 16:49 — with GitHub Actions Inactive
@sergio-ropero sergio-ropero marked this pull request as ready for review January 30, 2023 17:27
@sergio-ropero sergio-ropero requested a review from a team as a code owner January 30, 2023 17:27
@sergio-ropero sergio-ropero temporarily deployed to more-secrets January 30, 2023 17:30 — with GitHub Actions Inactive
@sergio-ropero sergio-ropero temporarily deployed to more-secrets January 30, 2023 17:30 — with GitHub Actions Inactive
@sergio-ropero sergio-ropero temporarily deployed to more-secrets January 30, 2023 17:48 — with GitHub Actions Inactive
@sergio-ropero sergio-ropero temporarily deployed to more-secrets January 30, 2023 17:48 — with GitHub Actions Inactive
@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 10, 2023

/test connector=connectors/source-postgres

🕑 connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/4384082211
✅ connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/4384082211
No Python unittests run

Build Passed

Test summary info:

=========================== short test summary info ============================
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestIncremental.test_two_sequential_reads: not found in the config.
SKIPPED [2] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/tests/test_core.py:109: Backward compatibility tests are disabled for version 1.0.52.
SKIPPED [2] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/tests/test_core.py:509: The previous and actual discovered catalogs are identical.
=================== 62 passed, 5 skipped in 83.33s (0:01:23) ===================

@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 10, 2023

/test connector=connectors/source-postgres-strict-encrypt

🕑 connectors/source-postgres-strict-encrypt https://github.com/airbytehq/airbyte/actions/runs/4384084418
✅ connectors/source-postgres-strict-encrypt https://github.com/airbytehq/airbyte/actions/runs/4384084418
No Python unittests run

Build Passed

Test summary info:

All Passed

@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 10, 2023

/test connector=connectors/source-alloydb

🕑 connectors/source-alloydb https://github.com/airbytehq/airbyte/actions/runs/4384456937
✅ connectors/source-alloydb https://github.com/airbytehq/airbyte/actions/runs/4384456937
No Python unittests run

Build Passed

Test summary info:

All Passed

@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 10, 2023

/test connector=connectors/source-alloydb-strict-encrypt

🕑 connectors/source-alloydb-strict-encrypt https://github.com/airbytehq/airbyte/actions/runs/4384458239
✅ connectors/source-alloydb-strict-encrypt https://github.com/airbytehq/airbyte/actions/runs/4384458239
No Python unittests run

Build Passed

Test summary info:

All Passed

@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 10, 2023

/test connector=connectors/source-mysql

🕑 connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/4384460846
✅ connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/4384460846
No Python unittests run

Build Passed

Test summary info:

=========================== short test summary info ============================
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestConnection.test_check: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestDiscovery.test_discover: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestBasicRead.test_read: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestFullRefresh.test_sequential_reads: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestIncremental.test_two_sequential_reads: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/tests/test_core.py:100: The previous and actual specifications are identical.
================= 19 passed, 6 skipped, 27 warnings in 16.84s ==================

@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 10, 2023

/test connector=connectors/source-mysql-strict-encrypt

🕑 connectors/source-mysql-strict-encrypt https://github.com/airbytehq/airbyte/actions/runs/4384462538
✅ connectors/source-mysql-strict-encrypt https://github.com/airbytehq/airbyte/actions/runs/4384462538
No Python unittests run

Build Passed

Test summary info:

All Passed

@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 10, 2023

/test connector=connectors/source-mssql

🕑 connectors/source-mssql https://github.com/airbytehq/airbyte/actions/runs/4384466767
✅ connectors/source-mssql https://github.com/airbytehq/airbyte/actions/runs/4384466767
No Python unittests run

Build Passed

Test summary info:

=========================== short test summary info ============================
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestConnection.test_check: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestDiscovery.test_discover: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestBasicRead.test_read: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestFullRefresh.test_sequential_reads: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestIncremental.test_two_sequential_reads: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/tests/test_core.py:100: The previous and actual specifications are identical.
================= 19 passed, 6 skipped, 27 warnings in 17.49s ==================

@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 10, 2023

/test connector=connectors/source-mssql-strict-encrypt

🕑 connectors/source-mssql-strict-encrypt https://github.com/airbytehq/airbyte/actions/runs/4384468211
✅ connectors/source-mssql-strict-encrypt https://github.com/airbytehq/airbyte/actions/runs/4384468211
No Python unittests run

Build Passed

Test summary info:

All Passed

@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 13, 2023

/publish connector=connectors/source-postgres

🕑 Publishing the following connectors:
connectors/source-postgres
https://github.com/airbytehq/airbyte/actions/runs/4404098283


Connector Did it publish? Were definitions generated?
connectors/source-postgres

if you have connectors that successfully published but failed definition generation, follow step 4 here ▶️

@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 13, 2023

/publish connector=connectors/source-postgres-strict-encrypt auto-bump-version=false

🕑 Publishing the following connectors:
connectors/source-postgres-strict-encrypt
https://github.com/airbytehq/airbyte/actions/runs/4404098916


Connector Did it publish? Were definitions generated?
connectors/source-postgres-strict-encrypt

if you have connectors that successfully published but failed definition generation, follow step 4 here ▶️

@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 13, 2023

/publish connector=connectors/source-alloydb

🕑 Publishing the following connectors:
connectors/source-alloydb
https://github.com/airbytehq/airbyte/actions/runs/4404101299


Connector Did it publish? Were definitions generated?
connectors/source-alloydb

if you have connectors that successfully published but failed definition generation, follow step 4 here ▶️

@sergio-ropero
Copy link
Contributor Author

sergio-ropero commented Mar 13, 2023

/publish connector=connectors/source-alloydb-strict-encrypt auto-bump-version=false

🕑 Publishing the following connectors:
connectors/source-alloydb-strict-encrypt
https://github.com/airbytehq/airbyte/actions/runs/4404104180


Connector Did it publish? Were definitions generated?
connectors/source-alloydb-strict-encrypt

if you have connectors that successfully published but failed definition generation, follow step 4 here ▶️

@sergio-ropero
Copy link
Contributor Author

Manually bumped source-postgres connector.

@sergio-ropero sergio-ropero merged commit 771d218 into master Mar 13, 2023
@sergio-ropero sergio-ropero deleted the sergio/feat/cdc-checkpointing branch March 13, 2023 13:03
danielduckworth pushed a commit to danielduckworth/airbyte that referenced this pull request Mar 13, 2023
…21727)

* This commit adds new functionality that generates checkpoints when doing CDC synchronization.
For that purpose we encapsulate an AirbyteMessage Iterator on a new iterator that handles the
checkpoint messaging.

* Reformat code

* Reformat code

* Reformat code

* Reformat code

* Second attempt with ugly if statement

* Add `isRecordBehindOffset` function to make sure is safe to send the state.
Tests are failing as now it has more state messages:
expected: <1> but was: <3>

* Code formatting

* Add additional check if the record is part of the snapshot load to skip state message.

* Remove comments

* Fix imports

* Fix format

* Add check if the iterator has extra elements so we don't send state message twice (edge case)

* Add a new check to avoid sending multiple state messages with same offset.
Fix PR comments.
Not sending checkpoints... figuring out

* Modify MSSQL and MySQL implementations

* Adds better control on Maps and include a test for time checkpoint.
Also adds extra assert to verify there are no duplicate states

* Formatting

* Improve code documentation and use default for CdcStateHandler new functions

* Sort out missing `final` and types from comments

* Minor improve in checkpoint validation

* format files

* It's 2023!

* Import issues

* Changes after merging master

* Upgrade Debezium version in MySQL

* Bump Postgres and Alloydb

* auto-bump connector version

* Manually bumping version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
adriennevermorel pushed a commit to adriennevermorel/airbyte that referenced this pull request Mar 17, 2023
…21727)

* This commit adds new functionality that generates checkpoints when doing CDC synchronization.
For that purpose we encapsulate an AirbyteMessage Iterator on a new iterator that handles the
checkpoint messaging.

* Reformat code

* Reformat code

* Reformat code

* Reformat code

* Second attempt with ugly if statement

* Add `isRecordBehindOffset` function to make sure is safe to send the state.
Tests are failing as now it has more state messages:
expected: <1> but was: <3>

* Code formatting

* Add additional check if the record is part of the snapshot load to skip state message.

* Remove comments

* Fix imports

* Fix format

* Add check if the iterator has extra elements so we don't send state message twice (edge case)

* Add a new check to avoid sending multiple state messages with same offset.
Fix PR comments.
Not sending checkpoints... figuring out

* Modify MSSQL and MySQL implementations

* Adds better control on Maps and include a test for time checkpoint.
Also adds extra assert to verify there are no duplicate states

* Formatting

* Improve code documentation and use default for CdcStateHandler new functions

* Sort out missing `final` and types from comments

* Minor improve in checkpoint validation

* format files

* It's 2023!

* Import issues

* Changes after merging master

* Upgrade Debezium version in MySQL

* Bump Postgres and Alloydb

* auto-bump connector version

* Manually bumping version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
erohmensing pushed a commit that referenced this pull request Mar 22, 2023
* This commit adds new functionality that generates checkpoints when doing CDC synchronization.
For that purpose we encapsulate an AirbyteMessage Iterator on a new iterator that handles the
checkpoint messaging.

* Reformat code

* Reformat code

* Reformat code

* Reformat code

* Second attempt with ugly if statement

* Add `isRecordBehindOffset` function to make sure is safe to send the state.
Tests are failing as now it has more state messages:
expected: <1> but was: <3>

* Code formatting

* Add additional check if the record is part of the snapshot load to skip state message.

* Remove comments

* Fix imports

* Fix format

* Add check if the iterator has extra elements so we don't send state message twice (edge case)

* Add a new check to avoid sending multiple state messages with same offset.
Fix PR comments.
Not sending checkpoints... figuring out

* Modify MSSQL and MySQL implementations

* Adds better control on Maps and include a test for time checkpoint.
Also adds extra assert to verify there are no duplicate states

* Formatting

* Improve code documentation and use default for CdcStateHandler new functions

* Sort out missing `final` and types from comments

* Minor improve in checkpoint validation

* format files

* It's 2023!

* Import issues

* Changes after merging master

* Upgrade Debezium version in MySQL

* Bump Postgres and Alloydb

* auto-bump connector version

* Manually bumping version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
erohmensing pushed a commit that referenced this pull request Mar 22, 2023
* This commit adds new functionality that generates checkpoints when doing CDC synchronization.
For that purpose we encapsulate an AirbyteMessage Iterator on a new iterator that handles the
checkpoint messaging.

* Reformat code

* Reformat code

* Reformat code

* Reformat code

* Second attempt with ugly if statement

* Add `isRecordBehindOffset` function to make sure is safe to send the state.
Tests are failing as now it has more state messages:
expected: <1> but was: <3>

* Code formatting

* Add additional check if the record is part of the snapshot load to skip state message.

* Remove comments

* Fix imports

* Fix format

* Add check if the iterator has extra elements so we don't send state message twice (edge case)

* Add a new check to avoid sending multiple state messages with same offset.
Fix PR comments.
Not sending checkpoints... figuring out

* Modify MSSQL and MySQL implementations

* Adds better control on Maps and include a test for time checkpoint.
Also adds extra assert to verify there are no duplicate states

* Formatting

* Improve code documentation and use default for CdcStateHandler new functions

* Sort out missing `final` and types from comments

* Minor improve in checkpoint validation

* format files

* It's 2023!

* Import issues

* Changes after merging master

* Upgrade Debezium version in MySQL

* Bump Postgres and Alloydb

* auto-bump connector version

* Manually bumping version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants